昨天我們建立了 chat-service 和 RAG 檢索系統,但用戶體驗還有個大問題:等待黑盒子。用戶發問後只能乾等 10-30 秒,不知道系統在做什麼。
這篇要用 GCP 原生服務 打造真正的即時推播,讓用戶看到每個處理步驟的進度。
目標:零維護的即時推播系統,支用戶併發,離線也能收到通知。
flowchart TB
    subgraph "前端層"
        WEB[Web App<br/>React/Vue]
        MOBILE[Mobile App<br/>Flutter/React Native]
        FIREBASE_SDK[Firebase SDK<br/>即時監聽]
    end
    subgraph "GCP 託管服務層"
        FIRESTORE[Firestore<br/>即時資料庫]
        FUNCTIONS[Cloud Functions<br/>事件處理]
        PUBSUB[Pub/Sub<br/>事件匯流排]
        EVENTARC[Eventarc<br/>事件路由]
        FCM[Firebase Cloud Messaging<br/>推播通知]
    end
    subgraph "應用服務層"
        CHAT[chat-service]
        RAG[rag-service<br/>Agent Builder]
        WORKER[worker-service<br/>Cloud Run Jobs]
    end
    subgraph "即時資料流"
        WEB --> FIREBASE_SDK
        MOBILE --> FIREBASE_SDK
        FIREBASE_SDK <--> FIRESTORE
        CHAT --> PUBSUB
        RAG --> PUBSUB
        WORKER --> PUBSUB
        PUBSUB --> EVENTARC
        EVENTARC --> FUNCTIONS
        FUNCTIONS --> FIRESTORE
        FUNCTIONS --> FCM
    end
| 需求 | ❌ 傳統 WebSocket | ✅ Firebase 方案 | 優勢 | 
|---|---|---|---|
| 即時連接 | 自建 WebSocket 伺服器 | Firestore 即時監聽 | 零維護、自動擴展 | 
| 斷線重連 | 自己寫重連邏輯 | Firebase SDK 自動處理 | 穩定性更高 | 
| 負載均衡 | Sticky Session 配置 | Firebase 自動分散 | 無狀態,更靈活 | 
| 離線支援 | 需自建快取機制 | 內建離線快取 | 用戶體驗更好 | 
| 跨平台 | 每個平台不同實作 | 統一 Firebase SDK | 開發效率高 | 
| 擴展性 | 需自己處理併發 | Google 基礎設施 | 百萬級併發 | 
// Firestore 集合結構
collections:
  chats: {
    [chatId]: {
      user_id: string,
      status: 'pending' | 'processing' | 'completed' | 'error',
      current_step: string,
      progress: number,         // 0-100
      steps: {
        'receive_message': { status: 'completed', timestamp: '...' },
        'rag_search': { status: 'processing', progress: 30 },
        'llm_generation': { status: 'pending' },
        'response_ready': { status: 'pending' }
      },
      messages: [
        {
          role: 'user',
          content: string,
          timestamp: timestamp
        },
        {
          role: 'assistant',
          content: string,
          timestamp: timestamp,
          sources: [...],          // RAG 來源
          partial: boolean         // 是否為部分內容
        }
      ],
      metadata: {
        processing_time_ms: number,
        rag_results_count: number,
        error_message: string
      },
      created_at: timestamp,
      updated_at: timestamp
    }
  },
  user_sessions: {
    [userId]: {
      active_chats: [chatId, ...],
      preferences: {...},
      last_seen: timestamp
    }
  }
// firestore.rules
rules_version = '2';
service cloud.firestore {
  match /databases/{database}/documents {
    // 聊天文件權限
    match /chats/{chatId} {
      allow read, write: if request.auth != null &&
        request.auth.uid == resource.data.user_id;
    }
    // 用戶會話權限
    match /user_sessions/{userId} {
      allow read, write: if request.auth != null &&
        request.auth.uid == userId;
    }
    // 防止惡意寫入
    match /{document=**} {
      allow read, write: if false;
    }
  }
}
#!/bin/bash
# scripts/setup-firestore.sh
PROJECT_ID="your-project-id"
echo "🔥 設定 Firebase 和 Firestore..."
# 1. 啟用 Firebase API
gcloud services enable firebase.googleapis.com
gcloud services enable firestore.googleapis.com
# 2. 建立 Firebase 專案(如果還沒有)
firebase projects:addfirebase $PROJECT_ID
# 3. 初始化 Firestore
gcloud firestore databases create --region=asia-east1
# 4. 部署安全規則
firebase deploy --only firestore:rules
# 5. 建立複合索引
gcloud firestore indexes composite create \
    --collection-group=chats \
    --field-config=field-path=user_id,order=ASCENDING \
    --field-config=field-path=updated_at,order=DESCENDING
echo "✅ Firestore 設定完成"
# shared/firebase_client.py
import firebase_admin
from firebase_admin import credentials, firestore, messaging
from typing import Dict, Any, Optional, List
import logging
import json
from datetime import datetime
logger = logging.getLogger(__name__)
class FirebaseClient:
    """Firebase 客戶端統一管理"""
    def __init__(self, project_id: str):
        self.project_id = project_id
        # 初始化 Firebase Admin
        if not firebase_admin._apps:
            cred = credentials.ApplicationDefault()
            firebase_admin.initialize_app(cred, {
                'projectId': project_id
            })
        # 初始化 Firestore 客戶端
        self.db = firestore.client()
    async def create_chat_session(self, chat_id: str, user_id: str, initial_message: str) -> bool:
        """建立新的聊天會話"""
        try:
            chat_ref = self.db.collection('chats').document(chat_id)
            chat_data = {
                'user_id': user_id,
                'status': 'pending',
                'current_step': 'receive_message',
                'progress': 0,
                'steps': {
                    'receive_message': {
                        'status': 'completed',
                        'timestamp': datetime.now()
                    },
                    'rag_search': {
                        'status': 'pending'
                    },
                    'llm_generation': {
                        'status': 'pending'
                    },
                    'response_ready': {
                        'status': 'pending'
                    }
                },
                'messages': [
                    {
                        'role': 'user',
                        'content': initial_message,
                        'timestamp': datetime.now()
                    }
                ],
                'metadata': {},
                'created_at': datetime.now(),
                'updated_at': datetime.now()
            }
            chat_ref.set(chat_data)
            # 更新用戶會話
            await self._update_user_session(user_id, chat_id)
            return True
        except Exception as e:
            logger.error(f"建立聊天會話失敗: {e}")
            return False
    async def update_chat_progress(
        self,
        chat_id: str,
        step: str,
        progress: int,
        status: str = 'processing',
        message: str = None,
        metadata: Dict[str, Any] = None
    ):
        """更新聊天進度"""
        try:
            chat_ref = self.db.collection('chats').document(chat_id)
            # 準備更新資料
            update_data = {
                'current_step': step,
                'progress': progress,
                'status': status,
                f'steps.{step}.status': status,
                f'steps.{step}.timestamp': datetime.now(),
                'updated_at': datetime.now()
            }
            if progress:
                update_data[f'steps.{step}.progress'] = progress
            if message:
                update_data[f'steps.{step}.message'] = message
            if metadata:
                for key, value in metadata.items():
                    update_data[f'metadata.{key}'] = value
            # 原子更新
            chat_ref.update(update_data)
            logger.info(f"聊天 {chat_id} 進度更新: {step} - {progress}%")
        except Exception as e:
            logger.error(f"更新聊天進度失敗: {e}")
    async def add_message(
        self,
        chat_id: str,
        role: str,
        content: str,
        partial: bool = False,
        sources: List[Dict] = None
    ):
        """添加訊息到聊天"""
        try:
            chat_ref = self.db.collection('chats').document(chat_id)
            new_message = {
                'role': role,
                'content': content,
                'timestamp': datetime.now(),
                'partial': partial
            }
            if sources:
                new_message['sources'] = sources
            # 使用 arrayUnion 添加訊息
            chat_ref.update({
                'messages': firestore.ArrayUnion([new_message]),
                'updated_at': datetime.now()
            })
        except Exception as e:
            logger.error(f"添加訊息失敗: {e}")
    async def complete_chat(
        self,
        chat_id: str,
        final_response: str,
        processing_time_ms: int,
        sources: List[Dict] = None
    ):
        """完成聊天會話"""
        try:
            chat_ref = self.db.collection('chats').document(chat_id)
            # 添加最終回應
            if final_response:
                await self.add_message(
                    chat_id,
                    'assistant',
                    final_response,
                    partial=False,
                    sources=sources
                )
            # 更新狀態為完成
            chat_ref.update({
                'status': 'completed',
                'current_step': 'response_ready',
                'progress': 100,
                'steps.response_ready.status': 'completed',
                'steps.response_ready.timestamp': datetime.now(),
                'metadata.processing_time_ms': processing_time_ms,
                'updated_at': datetime.now()
            })
        except Exception as e:
            logger.error(f"完成聊天失敗: {e}")
    async def error_chat(self, chat_id: str, error_message: str):
        """標記聊天為錯誤狀態"""
        try:
            chat_ref = self.db.collection('chats').document(chat_id)
            chat_ref.update({
                'status': 'error',
                'metadata.error_message': error_message,
                'updated_at': datetime.now()
            })
        except Exception as e:
            logger.error(f"設定聊天錯誤狀態失敗: {e}")
    async def _update_user_session(self, user_id: str, chat_id: str):
        """更新用戶會話資訊"""
        try:
            user_ref = self.db.collection('user_sessions').document(user_id)
            user_ref.update({
                'active_chats': firestore.ArrayUnion([chat_id]),
                'last_seen': datetime.now()
            })
        except Exception as e:
            # 如果文件不存在,建立新的
            try:
                user_ref.set({
                    'active_chats': [chat_id],
                    'last_seen': datetime.now(),
                    'preferences': {}
                })
            except Exception as e2:
                logger.error(f"更新用戶會話失敗: {e2}")
# 全局 Firebase 客戶端實例
firebase_client: Optional[FirebaseClient] = None
def get_firebase_client(project_id: str = None) -> FirebaseClient:
    """獲取 Firebase 客戶端實例"""
    global firebase_client
    if firebase_client is None:
        if not project_id:
            import os
            project_id = os.getenv('GCP_PROJECT_ID')
        firebase_client = FirebaseClient(project_id)
    return firebase_client
# services/chat/app/handlers.py (Firebase 整合版)
import asyncio
import time
from typing import Dict, Any
from shared.firebase_client import get_firebase_client
from .models import ChatRequest, ChatResponse
class ChatHandler:
    def __init__(self):
        # ... 原有初始化 ...
        self.firebase = get_firebase_client()
    async def process_chat(self, request: ChatRequest) -> ChatResponse:
        """主要對話處理邏輯(Firebase 整合版)"""
        start_time = time.time()
        chat_id = request.chat_id or str(uuid.uuid4())
        try:
            # 1. 建立 Firebase 聊天會話
            await self.firebase.create_chat_session(
                chat_id,
                request.user_id,
                request.message
            )
            # 2. 載入用戶上下文
            await self.firebase.update_chat_progress(
                chat_id,
                'load_context',
                10,
                'processing',
                '正在載入對話記憶...'
            )
            context = await self._load_user_context(request.user_id, chat_id)
            # 3. 判斷處理模式
            processing_mode = self._determine_processing_mode(request.message, context)
            if processing_mode == ProcessingMode.SYNC:
                return await self._handle_sync_firebase(request, chat_id, context, start_time)
            else:
                return await self._handle_async_firebase(request, chat_id, context, start_time)
        except Exception as e:
            await self.firebase.error_chat(chat_id, str(e))
            raise
    async def _handle_sync_firebase(
        self,
        request: ChatRequest,
        chat_id: str,
        context: Dict,
        start_time: float
    ) -> ChatResponse:
        """同步處理(Firebase 版)"""
        try:
            # 更新進度:開始 LLM 生成
            await self.firebase.update_chat_progress(
                chat_id,
                'llm_generation',
                50,
                'processing',
                '正在生成回應...'
            )
            # 建立 prompt 並呼叫 LLM
            prompt = self._build_simple_prompt(request.message, context)
            response = await self._call_gemini(prompt, max_tokens=150)
            # 儲存對話
            await self._save_conversation(chat_id, request.user_id, request.message, response)
            # 完成聊天
            processing_time = int((time.time() - start_time) * 1000)
            await self.firebase.complete_chat(
                chat_id,
                response,
                processing_time
            )
            return ChatResponse(
                message=response,
                chat_id=chat_id,
                processing_mode=ProcessingMode.SYNC,
                is_complete=True,
                requires_followup=False
            )
        except Exception as e:
            await self.firebase.error_chat(chat_id, str(e))
            raise
    async def _handle_async_firebase(
        self,
        request: ChatRequest,
        chat_id: str,
        context: Dict,
        start_time: float
    ) -> ChatResponse:
        """非同步處理(Firebase 版)"""
        try:
            # 立即回應用戶
            quick_response = "我收到您的問題,正在仔細處理中..."
            # 發送到 Pub/Sub 進行背景處理
            await self._publish_task_event_firebase(request, chat_id, context)
            # 更新進度:任務已排程
            await self.firebase.update_chat_progress(
                chat_id,
                'task_scheduled',
                20,
                'processing',
                '任務已排程,正在檢索相關資料...'
            )
            return ChatResponse(
                message=quick_response,
                chat_id=chat_id,
                processing_mode=ProcessingMode.ASYNC,
                is_complete=False,
                requires_followup=True,
                metadata={"estimated_time": "10-30秒"}
            )
        except Exception as e:
            await self.firebase.error_chat(chat_id, str(e))
            raise
    async def _publish_task_event_firebase(
        self,
        request: ChatRequest,
        chat_id: str,
        context: Dict
    ):
        """發送任務事件到 Pub/Sub(包含 Firebase 資訊)"""
        if not self.publisher:
            logger.info("開發環境:模擬發送任務事件")
            return
        try:
            event = {
                "task_id": str(uuid.uuid4()),
                "chat_id": chat_id,
                "user_id": request.user_id,
                "message": request.message,
                "context": context,
                "firebase_enabled": True,  # 標記啟用 Firebase 更新
                "created_at": datetime.now().isoformat()
            }
            message_data = json.dumps(event).encode('utf-8')
            future = self.publisher.publish(self.topic_path, message_data)
            logger.info(f"任務事件已發送到 Pub/Sub: {future.result()}")
        except Exception as e:
            logger.error(f"發送任務事件失敗: {e}")
            await self.firebase.error_chat(chat_id, f"發送任務事件失敗: {e}")
# functions/chat_event_handler/main.py
import functions_framework
from google.cloud import pubsub_v1
import json
import asyncio
import logging
from shared.firebase_client import get_firebase_client
from services.rag.app.agent_builder_client import AgentBuilderClient
import google.generativeai as genai
logger = logging.getLogger(__name__)
# 全局初始化
firebase_client = get_firebase_client()
agent_builder = AgentBuilderClient(project_id="your-project-id")
@functions_framework.cloud_event
def handle_chat_task(cloud_event):
    """處理聊天任務事件"""
    try:
        # 解析 Pub/Sub 訊息
        message_data = cloud_event.data["message"]["data"]
        task_data = json.loads(base64.b64decode(message_data).decode())
        # 執行非同步處理
        asyncio.run(process_chat_task(task_data))
    except Exception as e:
        logger.error(f"處理聊天任務失敗: {e}")
async def process_chat_task(task_data: dict):
    """處理聊天任務的主要邏輯"""
    chat_id = task_data["chat_id"]
    user_id = task_data["user_id"]
    message = task_data["message"]
    try:
        # 步驟 1: RAG 檢索
        await firebase_client.update_chat_progress(
            chat_id,
            'rag_search',
            30,
            'processing',
            '正在檢索相關文件...'
        )
        rag_results = await perform_rag_search(message, user_id)
        # 步驟 2: LLM 生成
        await firebase_client.update_chat_progress(
            chat_id,
            'llm_generation',
            60,
            'processing',
            '正在生成詳細回答...'
        )
        final_response = await generate_rag_response(message, rag_results)
        # 步驟 3: 流式回應(可選)
        if len(final_response) > 500:
            await stream_response(chat_id, final_response)
        # 步驟 4: 完成處理
        await firebase_client.complete_chat(
            chat_id,
            final_response,
            processing_time_ms=0,  # Cloud Function 中難以精確計算
            sources=extract_sources(rag_results)
        )
    except Exception as e:
        await firebase_client.error_chat(chat_id, str(e))
        logger.error(f"處理任務失敗 {chat_id}: {e}")
async def perform_rag_search(query: str, user_id: str) -> dict:
    """執行 RAG 檢索"""
    try:
        # 使用 Agent Builder 進行檢索
        results = await agent_builder.search_documents(
            query=query,
            page_size=5
        )
        return results
    except Exception as e:
        logger.error(f"RAG 檢索失敗: {e}")
        return {"results": []}
async def generate_rag_response(query: str, rag_results: dict) -> str:
    """基於 RAG 結果生成回應"""
    try:
        # 建立增強的 prompt
        context_parts = []
        for result in rag_results.get("results", [])[:3]:
            context_parts.append(f"參考資料:{result.get('content', '')[:300]}")
        enhanced_prompt = f"""
基於以下參考資料回答問題:
{chr(10).join(context_parts)}
問題:{query}
請提供詳細、準確的回答,並在回答末尾註明參考來源。
        """.strip()
        # 呼叫 Gemini API
        genai.configure(api_key="your-gemini-api-key")
        model = genai.GenerativeModel('gemini-pro')
        response = model.generate_content(enhanced_prompt)
        return response.text
    except Exception as e:
        logger.error(f"生成回應失敗: {e}")
        return "抱歉,我在處理您的問題時遇到了一些困難。請稍後再試。"
async def stream_response(chat_id: str, full_response: str):
    """流式推送長回應"""
    try:
        # 將長回應分成多個部分
        chunks = [full_response[i:i+200] for i in range(0, len(full_response), 200)]
        for i, chunk in enumerate(chunks):
            progress = int((i + 1) / len(chunks) * 30) + 70  # 70-100%
            # 添加部分回應
            await firebase_client.add_message(
                chat_id,
                'assistant',
                chunk,
                partial=True
            )
            # 更新進度
            await firebase_client.update_chat_progress(
                chat_id,
                'streaming_response',
                progress,
                'processing',
                f'正在推送回應... ({i+1}/{len(chunks)})'
            )
            # 短暫延遲,模擬打字效果
            await asyncio.sleep(0.5)
    except Exception as e:
        logger.error(f"流式回應失敗: {e}")
def extract_sources(rag_results: dict) -> list:
    """提取 RAG 來源資訊"""
    sources = []
    for result in rag_results.get("results", [])[:3]:
        sources.append({
            "title": result.get("title", "未知來源"),
            "content_preview": result.get("content", "")[:100] + "...",
            "metadata": result.get("metadata", {})
        })
    return sources
# functions/chat_event_handler/function.yaml
name: chat-event-handler
runtime: python310
source: .
entry_point: handle_chat_task
trigger:
  event_trigger:
    event_type: google.cloud.pubsub.topic.v1.messagePublished
    resource: projects/YOUR_PROJECT/topics/chat-tasks
environment_variables:
  GCP_PROJECT_ID: "your-project-id"
  GEMINI_API_KEY: "your-gemini-api-key"
resources:
  memory: 512MB
  cpu: 1
  timeout: 540s
service_account_email: "chat-functions-sa@your-project.iam.gserviceaccount.com"
#!/bin/bash
# scripts/deploy-functions.sh
echo "☁️ 部署 Cloud Functions..."
# 1. 建立服務帳號
gcloud iam service-accounts create chat-functions-sa \
    --display-name="Chat Functions Service Account"
# 2. 設定權限
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:chat-functions-sa@$PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/firestore.user"
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:chat-functions-sa@$PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/pubsub.subscriber"
# 3. 部署函數
gcloud functions deploy chat-event-handler \
    --runtime=python310 \
    --trigger-topic=chat-tasks \
    --source=functions/chat_event_handler \
    --entry-point=handle_chat_task \
    --service-account=chat-functions-sa@$PROJECT_ID.iam.gserviceaccount.com \
    --set-env-vars="GCP_PROJECT_ID=$PROJECT_ID" \
    --memory=512MB \
    --timeout=540s \
    --region=asia-east1
echo "✅ Cloud Functions 部署完成"
// frontend/src/firebase/config.js
import { initializeApp } from 'firebase/app';
import { getFirestore } from 'firebase/firestore';
import { getAuth } from 'firebase/auth';
import { getMessaging } from 'firebase/messaging';
const firebaseConfig = {
  apiKey: "your-api-key",
  authDomain: "your-project.firebaseapp.com",
  projectId: "your-project-id",
  storageBucket: "your-project.appspot.com",
  messagingSenderId: "123456789",
  appId: "your-app-id"
};
// 初始化 Firebase
const app = initializeApp(firebaseConfig);
// 初始化服務
export const db = getFirestore(app);
export const auth = getAuth(app);
export const messaging = getMessaging(app);
export default app;
// frontend/src/components/ChatInterface.jsx
import React, { useState, useEffect, useRef } from 'react';
import { doc, onSnapshot, collection, addDoc } from 'firebase/firestore';
import { db } from '../firebase/config';
import { useAuth } from '../hooks/useAuth';
const ChatInterface = () => {
  const [messages, setMessages] = useState([]);
  const [inputMessage, setInputMessage] = useState('');
  const [chatStatus, setChatStatus] = useState(null);
  const [currentProgress, setCurrentProgress] = useState(0);
  const [isLoading, setIsLoading] = useState(false);
  const { user } = useAuth();
  const messagesEndRef = useRef(null);
  const currentChatId = useRef(null);
  // 滾動到最新訊息
  const scrollToBottom = () => {
    messagesEndRef.current?.scrollIntoView({ behavior: "smooth" });
  };
  useEffect(() => {
    scrollToBottom();
  }, [messages]);
  // 監聽聊天狀態更新
  useEffect(() => {
    if (!currentChatId.current) return;
    const unsubscribe = onSnapshot(
      doc(db, 'chats', currentChatId.current),
      (doc) => {
        if (doc.exists()) {
          const data = doc.data();
          // 更新訊息
          if (data.messages) {
            setMessages(data.messages);
          }
          // 更新進度和狀態
          setChatStatus(data.status);
          setCurrentProgress(data.progress || 0);
          // 如果完成,停止載入狀態
          if (data.status === 'completed' || data.status === 'error') {
            setIsLoading(false);
          }
        }
      },
      (error) => {
        console.error('監聽聊天更新失敗:', error);
        setIsLoading(false);
      }
    );
    return () => unsubscribe();
  }, [currentChatId.current]);
  // 發送訊息
  const sendMessage = async () => {
    if (!inputMessage.trim() || isLoading || !user) return;
    setIsLoading(true);
    const messageText = inputMessage;
    setInputMessage('');
    try {
      // 生成新的聊天 ID
      const chatId = Date.now().toString();
      currentChatId.current = chatId;
      // 呼叫後端 API 開始處理
      const response = await fetch('/api/chat', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${await user.getIdToken()}`
        },
        body: JSON.stringify({
          message: messageText,
          user_id: user.uid,
          chat_id: chatId,
          processing_mode: 'async'
        })
      });
      if (!response.ok) {
        throw new Error('發送訊息失敗');
      }
      // 後端會處理 Firebase 更新,前端只需監聽
    } catch (error) {
      console.error('發送訊息失敗:', error);
      setIsLoading(false);
      alert('發送訊息失敗,請稍後重試');
    }
  };
  // 處理按下 Enter 鍵
  const handleKeyPress = (e) => {
    if (e.key === 'Enter' && !e.shiftKey) {
      e.preventDefault();
      sendMessage();
    }
  };
  return (
    <div className="chat-interface">
      {/* 聊天區域 */}
      <div className="chat-messages">
        {messages.map((message, index) => (
          <div key={index} className={`message ${message.role}`}>
            <div className="message-content">
              {message.content}
              {message.partial && <span className="typing-indicator">...</span>}
            </div>
            {message.sources && (
              <div className="message-sources">
                <strong>參考來源:</strong>
                {message.sources.map((source, idx) => (
                  <span key={idx} className="source-tag">
                    {source.title}
                  </span>
                ))}
              </div>
            )}
            <div className="message-time">
              {new Date(message.timestamp?.toDate()).toLocaleTimeString()}
            </div>
          </div>
        ))}
        {/* 載入指示器和進度條 */}
        {isLoading && (
          <div className="loading-indicator">
            <div className="progress-container">
              <div className="progress-bar">
                <div
                  className="progress-fill"
                  style={{ width: `${currentProgress}%` }}
                ></div>
              </div>
              <div className="progress-text">
                {getProgressMessage(chatStatus, currentProgress)}
              </div>
            </div>
          </div>
        )}
        <div ref={messagesEndRef} />
      </div>
      {/* 輸入區域 */}
      <div className="chat-input">
        <textarea
          value={inputMessage}
          onChange={(e) => setInputMessage(e.target.value)}
          onKeyPress={handleKeyPress}
          placeholder="輸入您的問題..."
          disabled={isLoading}
          rows="3"
        />
        <button
          onClick={sendMessage}
          disabled={isLoading || !inputMessage.trim()}
          className="send-button"
        >
          {isLoading ? '處理中...' : '發送'}
        </button>
      </div>
    </div>
  );
};
// 根據狀態獲取進度訊息
const getProgressMessage = (status, progress) => {
  if (progress < 20) return '正在接收您的問題...';
  if (progress < 40) return '正在檢索相關文件...';
  if (progress < 70) return '正在分析和生成回答...';
  if (progress < 90) return '正在完善回答內容...';
  return '即將完成...';
};
export default ChatInterface;
/* frontend/src/components/ChatInterface.css */
.chat-interface {
  display: flex;
  flex-direction: column;
  height: 100vh;
  max-width: 800px;
  margin: 0 auto;
  border: 1px solid #e0e0e0;
  border-radius: 8px;
  overflow: hidden;
}
.chat-messages {
  flex: 1;
  overflow-y: auto;
  padding: 20px;
  background-color: #f8f9fa;
}
.message {
  margin-bottom: 15px;
  animation: fadeIn 0.3s ease-in;
}
.message.user {
  text-align: right;
}
.message.assistant {
  text-align: left;
}
.message-content {
  display: inline-block;
  max-width: 70%;
  padding: 12px 16px;
  border-radius: 18px;
  word-wrap: break-word;
}
.message.user .message-content {
  background-color: #007bff;
  color: white;
}
.message.assistant .message-content {
  background-color: white;
  border: 1px solid #e0e0e0;
  color: #333;
}
.typing-indicator {
  animation: pulse 1.5s infinite;
  color: #666;
}
.message-sources {
  margin-top: 8px;
  font-size: 12px;
  color: #666;
}
.source-tag {
  background-color: #e3f2fd;
  padding: 2px 6px;
  border-radius: 10px;
  margin-right: 5px;
  font-size: 10px;
}
.message-time {
  font-size: 11px;
  color: #999;
  margin-top: 5px;
}
.loading-indicator {
  text-align: center;
  padding: 20px;
  background-color: white;
  border-radius: 10px;
  margin: 10px 0;
  border: 1px solid #e0e0e0;
}
.progress-container {
  margin: 10px 0;
}
.progress-bar {
  width: 100%;
  height: 6px;
  background-color: #e0e0e0;
  border-radius: 3px;
  overflow: hidden;
}
.progress-fill {
  height: 100%;
  background: linear-gradient(90deg, #007bff, #0056b3);
  border-radius: 3px;
  transition: width 0.3s ease;
  animation: shimmer 2s infinite;
}
.progress-text {
  margin-top: 10px;
  color: #666;
  font-size: 14px;
}
.chat-input {
  display: flex;
  padding: 20px;
  border-top: 1px solid #e0e0e0;
  background-color: white;
}
.chat-input textarea {
  flex: 1;
  border: 1px solid #ddd;
  border-radius: 20px;
  padding: 12px 16px;
  font-size: 14px;
  resize: none;
  outline: none;
}
.chat-input textarea:focus {
  border-color: #007bff;
}
.send-button {
  margin-left: 10px;
  background-color: #007bff;
  color: white;
  border: none;
  border-radius: 20px;
  padding: 12px 24px;
  cursor: pointer;
  font-size: 14px;
  transition: background-color 0.2s;
}
.send-button:hover:not(:disabled) {
  background-color: #0056b3;
}
.send-button:disabled {
  background-color: #ccc;
  cursor: not-allowed;
}
/* 動畫效果 */
@keyframes fadeIn {
  from { opacity: 0; transform: translateY(10px); }
  to { opacity: 1; transform: translateY(0); }
}
@keyframes pulse {
  0%, 100% { opacity: 1; }
  50% { opacity: 0.5; }
}
@keyframes shimmer {
  0% { background-position: -200px 0; }
  100% { background-position: 200px 0; }
}
.progress-fill {
  background: linear-gradient(
    90deg,
    #007bff 25%,
    #66b3ff 50%,
    #007bff 75%
  );
  background-size: 200px 100%;
  animation: shimmer 2s infinite;
}
// frontend/public/firebase-messaging-sw.js
import { initializeApp } from 'firebase/app';
import { getMessaging, onBackgroundMessage } from 'firebase/messaging/sw';
const firebaseConfig = {
  // 你的 Firebase 配置
};
const app = initializeApp(firebaseConfig);
const messaging = getMessaging(app);
// 處理背景訊息
onBackgroundMessage(messaging, (payload) => {
  console.log('收到背景推播:', payload);
  const notificationTitle = payload.notification.title;
  const notificationOptions = {
    body: payload.notification.body,
    icon: '/icon-192x192.png',
    badge: '/badge-72x72.png',
    data: payload.data,
    actions: [
      {
        action: 'open_chat',
        title: '查看對話'
      }
    ]
  };
  self.registration.showNotification(notificationTitle, notificationOptions);
});
// 處理推播點擊
self.addEventListener('notificationclick', (event) => {
  event.notification.close();
  if (event.action === 'open_chat') {
    const chatId = event.notification.data.chat_id;
    const url = `${self.location.origin}/chat/${chatId}`;
    event.waitUntil(
      clients.openWindow(url)
    );
  }
});
// frontend/src/hooks/useNotifications.js
import { useState, useEffect } from 'react';
import { getMessaging, getToken, onMessage } from 'firebase/messaging';
import { messaging } from '../firebase/config';
export const useNotifications = (user) => {
  const [notificationPermission, setNotificationPermission] = useState(
    Notification.permission
  );
  const [fcmToken, setFcmToken] = useState(null);
  useEffect(() => {
    if (!user) return;
    requestNotificationPermission();
    setupMessageListener();
  }, [user]);
  const requestNotificationPermission = async () => {
    try {
      const permission = await Notification.requestPermission();
      setNotificationPermission(permission);
      if (permission === 'granted') {
        const token = await getToken(messaging, {
          vapidKey: 'your-vapid-key'
        });
        setFcmToken(token);
        // 將 Token 發送到後端儲存
        await saveFcmToken(user.uid, token);
      }
    } catch (error) {
      console.error('獲取推播權限失敗:', error);
    }
  };
  const setupMessageListener = () => {
    onMessage(messaging, (payload) => {
      console.log('收到前景推播:', payload);
      // 顯示瀏覽器通知
      if (notificationPermission === 'granted') {
        new Notification(payload.notification.title, {
          body: payload.notification.body,
          icon: payload.notification.icon,
          data: payload.data
        });
      }
    });
  };
  const saveFcmToken = async (userId, token) => {
    try {
      await fetch('/api/user/fcm-token', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${await user.getIdToken()}`
        },
        body: JSON.stringify({ userId, fcmToken: token })
      });
    } catch (error) {
      console.error('儲存 FCM Token 失敗:', error);
    }
  };
  return {
    notificationPermission,
    fcmToken,
    requestNotificationPermission
  };
};
# shared/fcm_service.py
from firebase_admin import messaging
import logging
logger = logging.getLogger(__name__)
class FCMService:
    """Firebase Cloud Messaging 服務"""
    def __init__(self):
        pass
    async def send_notification(
        self,
        fcm_token: str,
        title: str,
        body: str,
        data: dict = None
    ) -> bool:
        """發送推播通知"""
        try:
            message = messaging.Message(
                notification=messaging.Notification(
                    title=title,
                    body=body
                ),
                data=data or {},
                token=fcm_token
            )
            response = messaging.send(message)
            logger.info(f"推播通知已發送: {response}")
            return True
        except Exception as e:
            logger.error(f"發送推播通知失敗: {e}")
            return False
    async def send_chat_completion_notification(
        self,
        fcm_token: str,
        chat_id: str,
        preview: str
    ):
        """發送聊天完成通知"""
        await self.send_notification(
            fcm_token,
            "AI 助手回覆完成",
            preview[:50] + "..." if len(preview) > 50 else preview,
            {
                "type": "chat_completion",
                "chat_id": chat_id
            }
        )
    async def send_processing_notification(
        self,
        fcm_token: str,
        chat_id: str,
        progress: int
    ):
        """發送處理進度通知"""
        if progress == 50:  # 只在50%時發送一次,避免過多通知
            await self.send_notification(
                fcm_token,
                "正在處理您的問題",
                f"進度: {progress}%,預計還需要 10-15 秒",
                {
                    "type": "processing_update",
                    "chat_id": chat_id,
                    "progress": str(progress)
                }
            )
# docker-compose.dev.yml
version: '3.8'
services:
  # Chat Service
  chat-service:
    build:
      context: .
      dockerfile: services/chat/Dockerfile
    ports:
      - "8080:8080"
    environment:
      - ENVIRONMENT=development
      - GCP_PROJECT_ID=your-project-id
      - FIREBASE_ENABLED=true
      - MEMORY_SERVICE_URL=http://memory-service:8081
      - RAG_SERVICE_URL=http://rag-service:8082
      - GEMINI_API_KEY=${GEMINI_API_KEY}
    depends_on:
      - memory-service
      - rag-service
  # Memory Service
  memory-service:
    build:
      context: .
      dockerfile: services/memory/Dockerfile
    ports:
      - "8081:8080"
    environment:
      - DATABASE_URL=postgresql://postgres:password@postgres:5432/ai_assistant
    depends_on:
      - postgres
  # RAG Service (Agent Builder)
  rag-service:
    build:
      context: .
      dockerfile: services/rag/Dockerfile
    ports:
      - "8082:8080"
    environment:
      - GCP_PROJECT_ID=your-project-id
      - DOCUMENTS_BUCKET=${PROJECT_ID}-documents
  # PostgreSQL Database
  postgres:
    image: postgres:15
    environment:
      - POSTGRES_DB=ai_assistant
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=password
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
  # Frontend Development Server
  frontend:
    build:
      context: ./frontend
      dockerfile: Dockerfile.dev
    ports:
      - "3000:3000"
    environment:
      - REACT_APP_API_BASE_URL=http://localhost:8080
      - REACT_APP_FIREBASE_API_KEY=${FIREBASE_API_KEY}
      - REACT_APP_FIREBASE_PROJECT_ID=your-project-id
    volumes:
      - ./frontend/src:/app/src
      - ./frontend/public:/app/public
volumes:
  postgres_data:
#!/bin/bash
# scripts/deploy-with-firebase.sh
PROJECT_ID="your-project-id"
REGION="asia-east1"
echo "🚀 部署完整的即時推播系統..."
# 1. 部署後端服務
echo "📦 部署後端服務..."
./scripts/deploy-backend-services.sh
# 2. 部署 Cloud Functions
echo "☁️ 部署 Cloud Functions..."
./scripts/deploy-functions.sh
# 3. 設定 Firestore 和 Firebase
echo "🔥 設定 Firebase..."
./scripts/setup-firestore.sh
# 4. 部署前端到 Firebase Hosting
echo "🌐 部署前端..."
cd frontend
npm run build
firebase deploy --only hosting
cd ..
# 5. 設定 FCM 和推播
echo "📱 設定推播通知..."
firebase deploy --only messaging
# 6. 測試完整流程
echo "🧪 測試即時推播..."
./scripts/test-realtime-flow.sh
echo "✅ 即時推播系統部署完成!"
echo ""
echo "🔗 應用網址:"
echo "  前端: https://your-project-id.web.app"
echo "  API: https://chat-service-xxx.run.app"
echo ""
echo "📊 監控儀表板:"
echo "  Firebase 控制台: https://console.firebase.google.com/project/your-project-id"
echo "  Cloud Functions 日誌: https://console.cloud.google.com/functions"
#!/bin/bash
# scripts/test-realtime-flow.sh
echo "🧪 測試即時推播流程..."
API_BASE="https://chat-service-xxx.run.app"
TEST_USER_ID="test-user-123"
TEST_MESSAGE="請分析人工智慧的發展趨勢"
# 1. 測試建立聊天會話
echo "1. 測試建立聊天會話..."
CHAT_RESPONSE=$(curl -s -X POST "$API_BASE/chat" \
  -H "Content-Type: application/json" \
  -d '{
    "message": "'$TEST_MESSAGE'",
    "user_id": "'$TEST_USER_ID'",
    "processing_mode": "async"
  }')
CHAT_ID=$(echo $CHAT_RESPONSE | jq -r '.chat_id')
echo "聊天 ID: $CHAT_ID"
# 2. 監控 Firestore 更新(模擬)
echo "2. 模擬監控 Firestore 更新..."
for i in {1..30}; do
  echo "檢查進度... ($i/30)"
  # 實際場景中,這裡會檢查 Firestore
  # 為了測試,我們模擬等待
  sleep 2
  if [ $i -eq 15 ]; then
    echo "✅ 檢測到進度更新: RAG 檢索完成"
  fi
  if [ $i -eq 25 ]; then
    echo "✅ 檢測到進度更新: LLM 生成完成"
  fi
done
echo "✅ 即時推播流程測試完成"
# shared/firestore_monitor.py
from google.cloud import firestore
from google.cloud import monitoring_v3
import time
import logging
logger = logging.getLogger(__name__)
class FirestoreMonitor:
    """Firestore 使用監控"""
    def __init__(self, project_id: str):
        self.project_id = project_id
        self.db = firestore.Client()
        self.monitoring_client = monitoring_v3.MetricServiceClient()
        self.project_name = f"projects/{project_id}"
    async def record_chat_metrics(
        self,
        chat_id: str,
        user_id: str,
        processing_time_ms: int,
        message_count: int
    ):
        """記錄聊天指標"""
        try:
            # 記錄到 Firestore 統計集合
            stats_ref = self.db.collection('chat_stats').document(chat_id)
            stats_ref.set({
                'user_id': user_id,
                'processing_time_ms': processing_time_ms,
                'message_count': message_count,
                'timestamp': firestore.SERVER_TIMESTAMP
            })
            # 記錄到 Cloud Monitoring
            await self._record_processing_time(processing_time_ms)
            await self._record_message_count(message_count)
        except Exception as e:
            logger.error(f"記錄聊天指標失敗: {e}")
    async def _record_processing_time(self, processing_time_ms: int):
        """記錄處理時間指標"""
        try:
            series = monitoring_v3.TimeSeries()
            series.metric.type = "custom.googleapis.com/chat/processing_time"
            series.resource.type = "global"
            now = time.time()
            seconds = int(now)
            nanos = int((now - seconds) * 10 ** 9)
            interval = monitoring_v3.TimeInterval({
                "end_time": {"seconds": seconds, "nanos": nanos}
            })
            point = monitoring_v3.Point({
                "interval": interval,
                "value": {"int64_value": processing_time_ms}
            })
            series.points = [point]
            self.monitoring_client.create_time_series(
                name=self.project_name,
                time_series=[series]
            )
        except Exception as e:
            logger.error(f"記錄處理時間指標失敗: {e}")
    async def get_daily_stats(self, date: str = None) -> dict:
        """獲取每日統計"""
        try:
            if not date:
                from datetime import date as dt
                date = dt.today().isoformat()
            # 查詢當日統計
            stats_query = self.db.collection('chat_stats') \
                .where('timestamp', '>=', date) \
                .where('timestamp', '<', date + 'T23:59:59')
            docs = stats_query.stream()
            total_chats = 0
            total_processing_time = 0
            total_messages = 0
            for doc in docs:
                data = doc.to_dict()
                total_chats += 1
                total_processing_time += data.get('processing_time_ms', 0)
                total_messages += data.get('message_count', 0)
            return {
                'date': date,
                'total_chats': total_chats,
                'average_processing_time_ms': total_processing_time / total_chats if total_chats > 0 else 0,
                'total_messages': total_messages,
                'average_messages_per_chat': total_messages / total_chats if total_chats > 0 else 0
            }
        except Exception as e:
            logger.error(f"獲取每日統計失敗: {e}")
            return {}
# shared/performance_optimizer.py
import asyncio
from typing import List, Dict, Any
import logging
logger = logging.getLogger(__name__)
class FirebaseOptimizer:
    """Firebase 效能優化器"""
    @staticmethod
    def batch_firestore_writes(updates: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """批次寫入優化"""
        # 將多個更新合併為批次寫入
        # 最多 500 個操作一批
        batches = []
        current_batch = []
        for update in updates:
            current_batch.append(update)
            if len(current_batch) >= 500:
                batches.append(current_batch)
                current_batch = []
        if current_batch:
            batches.append(current_batch)
        return batches
    @staticmethod
    def optimize_listener_queries(user_id: str) -> Dict[str, Any]:
        """優化 Firestore 監聽查詢"""
        # 只監聽用戶相關的文件,減少不必要的讀取
        return {
            'collection': 'chats',
            'where': [('user_id', '==', user_id)],
            'orderBy': ('updated_at', 'desc'),
            'limit': 10  # 只監聽最近 10 個聊天
        }
    @staticmethod
    async def cleanup_old_chats(firebase_client, days: int = 30):
        """清理舊聊天記錄"""
        try:
            from datetime import datetime, timedelta
            cutoff_date = datetime.now() - timedelta(days=days)
            # 查詢舊聊天
            old_chats_query = firebase_client.db.collection('chats') \
                .where('updated_at', '<', cutoff_date) \
                .limit(100)  # 批次處理
            docs = old_chats_query.stream()
            deleted_count = 0
            for doc in docs:
                doc.reference.delete()
                deleted_count += 1
            logger.info(f"清理了 {deleted_count} 個舊聊天記錄")
        except Exception as e:
            logger.error(f"清理舊聊天記錄失敗: {e}")